202602021508 python如何在将contextVar带到子线程中

202602021508 python如何在将contextVar带到子线程中

import time  
from concurrent.futures import ThreadPoolExecutor  
from contextvars import copy_context, ContextVar  
  
trace_id = ContextVar("trace_id")  
  
def work(x):  
    time.sleep(x)  
    print("trace_id =", trace_id.get(None), "x =", x)  
  
def submit_with_context(pool, fn, *args, **kwargs):  
    ctx = copy_context()  
    return pool.submit(ctx.run, fn, *args, **kwargs)  
  
trace_id.set("REQ-123")  
  
with ThreadPoolExecutor(max_workers=2) as pool:  
    f1 = submit_with_context(pool, work, 1)  
    f2 = submit_with_context(pool, work, 2)  
    f3 = pool.submit(work, 3)  
  
    f1.result()  
    f2.result()  
    f3.result()

会将ContextVar带到子线程中

输出:

trace_id = REQ-123 x = 1
trace_id = REQ-123 x = 2
trace_id = None x = 3

特别适合在做trace的时候,将 contextVar 传到子线程中

在代码库中包装一层:

"""上下文感知的并发工具,解决 ThreadPoolExecutor 中 contextvars 不自动传播的问题。"""

import contextvars
from concurrent.futures import ThreadPoolExecutor, Future
from typing import Callable


class ContextAwareThreadPoolExecutor(ThreadPoolExecutor):
    """自动传播 contextvars 上下文到工作线程的 ThreadPoolExecutor, 保留主进程logid,trace等信息。
    用法与标准 ThreadPoolExecutor 完全一致,只需替换 import 即可。
    """

    def submit(self, fn: Callable, /, *args, **kwargs) -> Future:
        """提交任务时自动捕获当前上下文,使工作线程继承调用方的 contextvars。"""
        ctx = contextvars.copy_context()
        return super().submit(ctx.run, fn, *args, **kwargs)